Library Imports
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F
from datetime import datetime
from decimal import Decimal
Template
spark = (
SparkSession.builder
.master("local")
.appName("Section 2.10 - Spark Functions aren't Enough, I Need my Own!")
.config("spark.some.config.option", "some-value")
.getOrCreate()
)
sc = spark.sparkContext
import os
data_path = "/data/pets.csv"
base_path = os.path.dirname(os.getcwd())
path = base_path + data_path
pets = spark.read.csv(path, header=True)
pets.toPandas()
id | breed_id | nickname | birthday | age | color | |
---|---|---|---|---|---|---|
0 | 1 | 1 | King | 2014-11-22 12:30:31 | 5 | brown |
1 | 2 | 3 | Argus | 2016-11-22 10:05:10 | 10 | None |
2 | 3 | 1 | Chewie | 2016-11-22 10:05:10 | 15 | None |
3 | 3 | 2 | Maple | 2018-11-22 10:05:10 | 17 | white |
Spark Functions aren't Enough, I Need my Own!
What happens when you can't find functions that can perform what you want? Try looking again 🤪. But if this is really the case, your last resort can be to implement an udf
short for user defined function
.
These are functions written in python code that take a subset of columns as the input and returns a new column back. There are multiple steps in creating a udf
, we'll walk through one and decompose it step by step.
Option 1: Steps
# Step 1: Create your own function
def uppercase_words(word, cuttoff_length=2):
return word.upper()[:cuttoff_length] if word else None
s = 'Hello World!'
print(s)
print(uppercase_words(s, 20))
# Step 2: Register the udf as a spark udf
uppercase_words_udf = F.udf(uppercase_words, T.StringType())
# Step 3: Use it!
(
pets
.withColumn('nickname_uppercase', uppercase_words_udf(F.col('nickname')))
.withColumn('color_uppercase', uppercase_words_udf(F.col('color')))
.withColumn('color_uppercase_trimmed', uppercase_words_udf(F.col('color'), F.lit(3)))
.toPandas()
)
Hello World!
HELLO WORLD!
id | breed_id | nickname | birthday | age | color | nickname_uppercase | color_uppercase | color_uppercase_trimmed | |
---|---|---|---|---|---|---|---|---|---|
0 | 1 | 1 | King | 2014-11-22 12:30:31 | 5 | brown | KI | BR | BRO |
1 | 2 | 3 | Argus | 2016-11-22 10:05:10 | 10 | None | AR | None | None |
2 | 3 | 1 | Chewie | 2016-11-22 10:05:10 | 15 | None | CH | None | None |
3 | 3 | 2 | Maple | 2018-11-22 10:05:10 | 17 | white | MA | WH | WHI |
What Happened?
Although the upper function is defined in the spark fuctions
library it still serves as a good example. Let's breakdown the steps involved:
- Create the function that you want (
uppercase_words
), remembering that onlyspark columnar objects
are accepted as input arguments to the function. This means if you want to use other values, you will need to cast it to a column object usingF.lit()
from the previous sections. - Register the python function as a spark function, and specify the spark return type. The format is like so
F.udf(python_function, spark_return_type)
. - Now you can use the function!
Option 2: 1 Less Step
from pyspark.sql.functions import udf
# Step 1: Create and register your own function
@udf('string', 'int')
def uppercase_words(word, cuttoff_length=2):
return word.upper()[:cuttoff_length] if word else None
# Step 2: Use it!
(
pets
.withColumn('color_uppercase', uppercase_words_udf(F.col('color')))
.toPandas()
)
id | breed_id | nickname | birthday | age | color | color_uppercase | |
---|---|---|---|---|---|---|---|
0 | 1 | 1 | King | 2014-11-22 12:30:31 | 5 | brown | BR |
1 | 2 | 3 | Argus | 2016-11-22 10:05:10 | 10 | None | None |
2 | 3 | 1 | Chewie | 2016-11-22 10:05:10 | 15 | None | None |
3 | 3 | 2 | Maple | 2018-11-22 10:05:10 | 17 | white | WH |
What Happened?
The udf
function can also be used as a decorator to register your python functions as spark functions.
Where the inputs are the types of the arguments to the udf
.
The Ugly Part of udf
s
TL;DR Spark function
s are executed on the JVM, while Python UDF
s are executed in Python. This will require extra python memory for your spark application (will explain in Chapter 6) and more passing of data between the JVM and Python.
If your function can be performed with the spark functions
, you should alway use the spark functions
. udf
s perform very poorly compared to the spark functions
. This is a greate response that encapsulates the reason as to why:
The main reasons are already enumerated above and can be reduced to a simple fact that Spark DataFrame is natively a JVM structure and standard access methods are implemented by simple calls to Java API. UDF from the other hand are implemented in Python and require moving data back and forth.
While PySpark in general requires data movements between JVM and Python, in case of low level RDD API it typically doesn't require expensive serde activity. Spark SQL adds additional cost of serialization and serialization as well cost of moving data from and to unsafe representation on JVM. The later one is specific to all UDFs (Python, Scala and Java) but the former one is specific to non-native languages.
Unlike UDFs, Spark SQL functions operate directly on JVM and typically are well integrated with both Catalyst and Tungsten. It means these can be optimized in the execution plan and most of the time can benefit from codgen and other Tungsten optimizations. Moreover these can operate on data in its "native" representation.
So in a sense the problem here is that Python UDF has to bring data to the code while SQL expressions go the other way around.
source: https://stackoverflow.com/questions/38296609/spark-functions-vs-udf-performance
Option 3: Pandas Vectorized UDF
s
# TODO: https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
Summary
- We learnt how to use a python function within spark, called
udf
s. - We learnt how to pass non-column objects into the function by using knowledge gained from previous chapters.
- We learnt about the bad parts of
udf
s and their performance issues.